Section Overview

  1. Introduction to Column-Oriented Data Storage

  2. Deep Dive into Parquet

  3. Working with Arrow in R

  4. Querying Parquet with Different Engines

  5. Arrow Datasets for Larger-than-Memory Operations

  6. Partitioning Strategies

  7. Hands-on Workshop: Analysis with PUMS Data

Introduction to Column-Oriented Data Storage

Why should I care about data storage?

Data has to be represented somewhere, both during analysis and when storing.

The shape and characteristics of this representation has a huge impact on performance.

What if you could speed up a key part of your analysis by 30x and reduce your storage by 10x?

Row vs. Column-Oriented Storage

Row-oriented

|ID|Name |Age|City    |
|--|-----|---|--------|
|1 |Alice|25 |New York|
|2 |Bob  |30 |Boston  |
|3 |Carol|45 |Chicago |
  • Efficient for single record access
  • Efficient for appending

Column-oriented

ID:    [1, 2, 3]
Name:  [Alice, Bob, Carol]
Age:   [25, 30, 45]
City:  [New York, Boston, Chicago]
  • Efficient for analytics
  • Better compression

Why Column-Oriented Storage?

  • Analytics typically access a subset of columns
    • “What is the average age by city?”
    • Only needs [Age, City] columns
  • Benefits:
    • Only read needed columns from disk
    • Similar data types stored together
    • Better compression ratios

Column-Oriented Data is great

And you use column-oriented dataframes already!

… but still storing my data in a fundamentally row-oriented way.

The interconnection problem

The interconnection problem

What is Apache Arrow?

  • Cross-language development platform for in-memory data
    • Consistent in-memory columnar data format
    • Language-independent
    • Zero-copy reads
  • Benefits:
    • Seamless data interchange between systems
    • Fast analytical processing
    • Efficient memory usage

What is Apache Parquet?

  • Open-source columnar storage format
    • Created by Twitter and Cloudera in 2013
    • Part of the Apache Software Foundation
  • Features:
    • Columnar storage
    • Efficient compression
    • Explicit schema
    • Statistical metadata

Reading a File

As a CSV file

system.time({
  df <- read.csv("CA_person_2021.csv")
})
   user  system elapsed 
 14.449   0.445  15.037 

Reading a File

As a Parquet file

library(arrow)
options(arrow.use_altrep = FALSE)

system.time({
  df <- read_parquet("CA_person_2021.parquet")
})
   user  system elapsed 
  1.017   0.207   0.568 

Deep Dive into Parquet

What is Parquet?

  • Schema metadata
    • Self-describing format
    • Preserves column types
    • Type-safe data interchange
  • Encodings
    • Dictionary — Particularly effective for categorical data
    • Run-length encoding - Efficient storage of sequential repeated values
  • Advanced compression
    • Column-specific compression algorithms
    • Both dictionary and value compression

Exercise

data <- tibble::tibble(
  integers = 1:10,
  doubles = as.numeric(1:10),
  strings = sprintf("%02d", 1:10)
)

write.csv(data, "numeric_base.csv", row.names = FALSE)
write_csv_arrow(data, "numeric_arrow.csv")
write_parquet(data, "numeric.parquet")

df_csv <- read.csv("numeric_base.csv")
df_csv_arrow <- read_csv_arrow("numeric_arrow.csv")
df_parquet <- read_parquet("numeric.parquet")

Are there any differences?

Exercise

> df_csv_arrow
# A tibble: 10 × 3
   integers doubles strings
      <int>   <int>   <int>
 1        1       1       1
 2        2       2       2
 3        3       3       3
 4        4       4       4
 5        5       5       5
 6        6       6       6
 7        7       7       7
 8        8       8       8
 9        9       9       9
10       10      10      10
> df_parquet
# A tibble: 10 × 3
   integers doubles strings
      <int>   <dbl> <chr>  
 1        1       1 01     
 2        2       2 02     
 3        3       3 03     
 4        4       4 04     
 5        5       5 05     
 6        6       6 06     
 7        7       7 07     
 8        8       8 08     
 9        9       9 09     
10       10      10 10     

Exercise

> df_csv_arrow
# A tibble: 10 × 3
   integers doubles strings
      <int>   <int>   <int>
 1        1       1       1
 2        2       2       2
 3        3       3       3
 4        4       4       4
 5        5       5       5
 6        6       6       6
 7        7       7       7
 8        8       8       8
 9        9       9       9
10       10      10      10
> df_parquet
# A tibble: 10 × 3
   integers doubles strings
      <int>   <dbl> <chr>  
 1        1       1 01     
 2        2       2 02     
 3        3       3 03     
 4        4       4 04     
 5        5       5 05     
 6        6       6 06     
 7        7       7 07     
 8        8       8 08     
 9        9       9 09     
10       10      10 10     

Inside a Parquet File

Benchmarks: Parquet vs CSV

Benchmarks: Parquet vs CSV

Reading Efficiency: Selecting Columns

  • With CSV:
    • Must read entire file, even if you only need a few columns
    • No efficient way to skip columns during read
  • With Parquet:
    • Read only needed columns from disk
    • Significant performance benefit for wide tables
system.time({
  df_subset <- read_parquet(
    "CA_person_2021.parquet", 
    col_select = c("PUMA", "COW")
  )
})
   user  system elapsed 
  0.027   0.003   0.031 
   user  system elapsed 
  1.017   0.207   0.568 

nanoparquet vs. arrow Reader

  • nanoparquet
    • Lightweight Parquet reader
    • Minimal dependencies
    • Good for embedding
  • arrow
    • Full-featured reader
    • Support for datasets
    • Integration with Arrow ecosystem
    • Optimized for analytics workloads

nanoparquet vs. arrow Reader

library(arrow)
options(arrow.use_altrep = FALSE)

system.time({
  df <- read_parquet("CA_person_2021.parquet")
})
   user  system elapsed 
  1.017   0.207   0.568 


library(nanoparquet)

system.time({
  df <- read_parquet("CA_person_2021.parquet")
})
   user  system elapsed 
  0.709   0.099   0.894 

Parquet Tooling Ecosystem

Languages with native Parquet support:

  • R (via arrow)
  • Python (via pyarrow, pandas)
  • Java
  • C++
  • Rust
  • JavaScript
  • Go

Parquet Tooling Ecosystem

Systems with Parquet integration:

  • Apache Spark
  • Apache Hadoop
  • Apache Drill
  • Snowflake
  • Amazon Athena
  • Google BigQuery
  • DuckDB

Working with Parquet files with Arrow in R

Introduction to the arrow Package

# Install and load the Arrow package
install.packages("arrow")
library(arrow)

# Check Arrow version and capabilities
arrow_info()
  • The arrow package provides:
    • Native R interface to Apache Arrow
    • Tools for working with large datasets
    • Integration with dplyr for data manipulation
    • Reading/writing various file formats

Reading and Writing Parquet files, revisited

# Read a Parquet file into R
data <- read_parquet("CA_person_2021.parquet")

# Write an R data frame to Parquet
write_parquet(data, "CA_person_2021_new.parquet")

# Reading a subset of columns
df_subset <- read_parquet(
  "CA_person_2021.parquet", 
  col_select = c("PUMA", "COW", "AGEP")
)

# Reading with a row filter (predicate pushdown)
df_filtered <- open_dataset("CA_person_2021.parquet") |> 
  filter(AGEP > 40) |>
  collect()

Demo: Using dplyr with arrow

# Create an Arrow Table
df <- read_parquet("CA_person_2021.parquet", as_data_frame = FALSE)

# Use dplyr verbs with arrow tables
df |>
  filter(AGEP >= 16) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) 

Querying Parquet with Different Engines

Introduction to DuckDB

  • Analytical SQL database system
    • Embedded database (like SQLite)
    • Column oriented
    • In-process query execution
  • Key features:
    • Direct Parquet querying
    • Vectorized query execution
    • Parallel processing
    • Zero-copy integration with arrow

DuckDB

library(duckdb)

con <- dbConnect(duckdb())

# Register a Parquet file as a virtual table
dbExecute(con, "CREATE VIEW pums AS SELECT * 
                FROM read_parquet('CA_person_2021.parquet')")

# Run our query
dbGetQuery(con, "
  SELECT SUM(JWMNP * PWGTP)/SUM(PWGTP) as avg_commute_time,
         COUNT(*) as count
  FROM pums
  WHERE AGEP >= 16
")

dbDisconnect(con, shutdown = TRUE)

duckplyr

library(duckplyr)

# Read data with Arrow
pums_data <- read_file_duckdb(
  "CA_person_2021.parquet", 
  "read_parquet"
)

# Use duckplyr to optimize dplyr operations
pums_data |>
  filter(AGEP >= 16) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) |>
  collect()

data.table

library(arrow)
library(data.table)

# Read Parquet file with Arrow
pums_data <- read_parquet("CA_person_2021.parquet")

# Convert to data.table
pums_dt <- as.data.table(pums_data)

# data.table query
pums_dt[AGEP >= 16,
  .(avg_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) / sum(PWGTP), 
    count = .N)]

Arrow Query Execution

# Create an Arrow Dataset
pums_ds <- open_dataset("pums_dataset_dir/")

# Execute query with Arrow
result <- pums_ds |>
  filter(AGEP >= 16) |>
  group_by(ST) |>
  summarize(
    avg_commute_time = mean(JWMNP, na.rm = TRUE),
    count = n()
  ) |>
  arrange(desc(avg_commute_time)) |>
  collect()

Demo: Seamless Integration Arrow ↔︎ DuckDB

df <- read_parquet("CA_person_2021.parquet")

# Use dplyr verbs with arrow tables
df |>
  filter(AGEP >= 16) |>
  to_duckdb() |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) 

Arrow Datasets for Larger-than-Memory Operations

Understanding Arrow Datasets vs. Tables

Arrow Table

  • In-memory data structure
  • Must fit in RAM
  • Fast operations
  • Similar to base data frames
  • Good for single file data

Arrow Dataset

  • Collection of files
  • Lazily evaluated
  • Larger-than-memory capable
  • Distributed execution
  • Supports partitioning

Demo: Opening and Querying Multi-file Datasets

pums_ds <- open_dataset("data/person")

# Examine the dataset, list files
print(pums_ds)
head(pums_ds$files)

# Query execution with lazy evaluation
pums_ds |>
  filter(AGEP >= 16) |>
  group_by(year, ST) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) |>
  collect()

Lazy Evaluation and Query Optimization

  • Lazy evaluation workflow:
    1. Define operations (filter, group, summarize)
    2. Arrow builds an execution plan
    3. Optimizes the plan (predicate pushdown, etc.)
    4. Only reads necessary data from disk
    5. Executes when collect() is called
  • Benefits:
    • Minimizes memory usage
    • Reduces I/O operations
    • Leverages Arrow’s native compute functions

Working with Datasets on S3

arrow can work with data and datasets in cloud storage. This can be a good option if you don’t have access to a formal DBMS.

  • Easy to store
  • arrow efficiently uses metadata to read only what is necessary

Demo: Working with Datasets on S3

pums_ds <- open_dataset("s3://scaling-arrow-pums/person/")

# Query execution with lazy evaluation
pums_ds |>
  filter(year == 2021, location == "ca", AGEP >= 16) |>
  group_by(year, ST) |>
  summarize(
    mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
      sum(PWGTP),
    count = n()
  ) |>
  collect()

Demo: Sipping data

pums_ds <- open_dataset("s3://scaling-arrow-pums/person/")

# Query execution with lazy evaluation
pums_ds |>
  filter(AGEP >= 97) |>
  collect()

Partitioning Strategies

What is Partitioning?

  • Dividing data into logical segments
    • Stored in separate files/directories
    • Based on one or more column values
    • Enables efficient filtering
  • Benefits:
    • Faster queries that filter on partition columns
    • Improved parallel processing
    • Easier management of large datasets

Hive vs. Non-Hive Partitioning

Hive Partitioning

  • Directory format: column=value

  • Example:

    person/
    ├── year=2018/
    │   ├── state=NY/
    │   │   └── data.parquet
    │   └── state=CA/
    │       └── data.parquet
    ├── year=2019/
    │   ├── ...
  • Self-describing structure

  • Standard in big data ecosystem

Non-Hive Partitioning

  • Directory format: value

  • Example:

    person/
    ├── 2018/
    │   ├── NY/
    │   │   └── data.parquet
    │   └── CA/
    │       └── data.parquet
    ├── 2019/
    │   ├── ...
  • Requires column naming

  • Less verbose directory names

Effective Partitioning Strategies

  • Choose partition columns wisely:
    • Low to medium cardinality
    • Commonly used in filters
    • Balanced data distribution
  • Common partition dimensions:
    • Time (year, month, day)
    • Geography (country, state, region)
    • Category (product type, department)
    • Source (system, sensor)

Partitioning in Practice: Writing Datasets

ca_pums_data <- read_parquet("CA_person_2021.parquet")

ca_pums_data |>
  mutate(
    age_group = case_when(
      AGEP < 18 ~ "under_18",
      AGEP < 30 ~ "18_29",
      AGEP < 45 ~ "30_44",
      AGEP < 65 ~ "45_64",
      TRUE ~ "65_plus"
    )
  ) |>
  group_by(ST, age_group) |>
  write_dataset(
    path = "ca_pums_by_age/"
  )

Demo: repartitioning the whole dataset

pums_data <- open_dataset("data/person")

pums_data |>
  mutate(
    age_group = case_when(
      AGEP < 18 ~ "under_18",
      AGEP < 30 ~ "18_29",
      AGEP < 45 ~ "30_44",
      AGEP < 65 ~ "45_64",
      TRUE ~ "65_plus"
    )
  ) |>
  group_by(year, ST, age_group) |>
  write_dataset(
    path = "pums_by_age/"
  )

Best Practices for Partition Design

  • Avoid over-partitioning:
    • Too many small files = poor performance
    • Target file size: 20MB–2GB
    • Avoid high-cardinality columns (e.g., user_id)
  • Consider query patterns:
    • Partition by commonly filtered columns
    • Balance between read speed and write complexity
  • Nested partitioning considerations:
    • Order from highest to lowest selectivity
    • Limit partition depth (2-3 levels typically sufficient)

Partitioning Performance Impact

open_dataset("<path/to/data>") |>
  filter(year >= 2018) |>
  summarise(
    mean_commute = sum(JWMNP * PWGTP, na.rm = TRUE) / sum(PWGTP)
  ) |>
  collect()

Hands-on Workshop: Analysis with PUMS Data

The PUMS Dataset

  • Public Use Microdata Sample
    • US Census Bureau data
    • Individual person and household records
    • Anonymized demographic information
    • Income, education, housing, commute, etc.
  • Dataset characteristics:
    • Multiple years (2005-2022)
    • All US states and territories
    • 53 Million rows (person only)
    • 200+ variables

Reading Partitioned PUMS Data

library(arrow)
library(dplyr)

# Open the PUMS dataset
pums_path <- "data/person"  # Partitioned by year and location
pums_ds <- open_dataset(pums_path)

# Examine the dataset
pums_ds

# Look at the first few file paths
head(pums_ds$files)

# Examine schema
pums_ds$schema

Exercise 1: Basic Filtering and Aggregation

# Calculate average income by state for 2021
result1 <- pums_ds |>
  filter(year == 2021) |>
  filter(PINCP > 0) |>  # Positive income only
  group_by(location) |>
  summarize(
    avg_income = mean(PINCP, na.rm = TRUE),
    median_income = quantile(PINCP, 0.5, na.rm = TRUE),
    n = n()
  ) |>
  arrange(desc(avg_income)) |>
  collect()

# View results
head(result1)

Exercise 2: Using DuckDB with PUMS Data

library(arrow)
library(duckdb)
library(DBI)

# Open the PUMS dataset with Arrow
pums_ds <- open_dataset("data/person")

# Filter to just 2021 data for Washington
wa_2021 <- pums_ds |>
  filter(year == 2021, location == "wa") |>
  collect()

# Create DuckDB connection
con <- dbConnect(duckdb())

# Register Arrow Table with DuckDB
duckdb::duckdb_register_arrow(con, "pums_wa", wa_2021)

# Run SQL query
result <- dbGetQuery(con, "
  SELECT 
    CASE 
      WHEN AGEP < 18 THEN 'Under 18'
      WHEN AGEP < 30 THEN '18-29'
      WHEN AGEP < 45 THEN '30-44'
      WHEN AGEP < 65 THEN '45-64'
      ELSE '65+'
    END AS age_group,
    AVG(JWMNP) AS avg_commute_time,
    COUNT(*) AS n
  FROM pums_wa
  WHERE JWMNP > 0
  GROUP BY age_group
  ORDER BY age_group
")

# Disconnect when done
dbDisconnect(con, shutdown = TRUE)

Challenge: Formulate Your Own Analysis

  • Choose a research question:
    • How has commute time changed over the years?
    • What’s the relationship between education and income?
    • How does housing cost burden vary by state?
    • Your own question…
  • Implement the analysis using:
    • Arrow Dataset operations
    • DuckDB SQL queries
    • Data visualization
  • Compare performance between approaches

Conclusion

  • Column-oriented storage formats like Parquet provide massive performance advantages for analytical workloads (30x speed, 10x smaller files)
  • Apache Arrow enables seamless data interchange between systems without costly serialization/deserialization
  • Multiple query engines (arrow, DuckDB, data.table) offer flexibility depending on your analysis needs, all using modern formats like Parquet
  • Partitioning strategies help manage large datasets effectively when working with data too big for memory

Conclusion

Resources:

Questions?